/*!
 * \file WtDtRunner.cpp
 * \project	WonderTrader
 *
 * \author Wesley
 * \date 2020/03/30
 * 
 * \brief WonderTrader数据运行器实现
 * 
 * \details 本文件实现了WonderTrader数据服务系统的核心运行器类WtDtRunner：
 *          - 数据服务系统的主控制器和协调器实现
 *          - 管理数据解析器（Parser）和数据导出器（Dumper）的完整生命周期
 *          - 实现扩展Parser接口，支持自定义行情解析器
 *          - 实现扩展Dumper接口，支持自定义数据导出器
 *          - 集成数据管理器、状态监控、UDP广播等核心组件
 *          - 提供异步IO支持和多线程处理能力
 *          - 支持实时行情数据处理和历史数据导出
 *          - 支持Level2数据（订单队列、订单明细、逐笔成交）处理
 *          - 支持基础数据管理（合约、商品、交易时段、节假日）
 *          - 支持配置文件加载和系统初始化
 */
#include "WtDtRunner.h"
#include "ExpParser.h"

#include "../WtDtCore/WtHelper.h"

#include "../Includes/WTSSessionInfo.hpp"
#include "../Includes/WTSVariant.hpp"
#include "../Includes/WTSDataDef.hpp"

#include "../Share/StrUtil.hpp"

#include "../WTSUtils/WTSCfgLoader.h"
#include "../WTSTools/WTSLogger.h"
#include "../WTSUtils/SignalHook.hpp"


/**
 * @brief 构造函数
 * @details 初始化数据运行器，设置默认参数和信号处理
 */
WtDtRunner::WtDtRunner()
	: _dumper_for_bars(NULL)
	, _dumper_for_ticks(NULL)
	, _dumper_for_ordque(NULL)
	, _dumper_for_orddtl(NULL)
	, _dumper_for_trans(NULL)
{
#if _WIN32
// Signal hooks disabled in WIN32
#else
// Signal hooks enabled in UNIX
	install_signal_hooks([](const char* message) {
		WTSLogger::error(message);
	});
#endif
}

/**
 * @brief 析构函数
 * @details 清理资源，释放所有组件
 */
WtDtRunner::~WtDtRunner()
{
}

void WtDtRunner::start(bool bAsync /* = false */, bool bAlldayMode /* = false */)
{
	_parsers.run();

    if(!bAsync)
    {
		boost::asio::post(_async_io, [this]() {
			std::this_thread::sleep_for(std::chrono::milliseconds(5));
			_state_mon.run();
		});

        	auto work = boost::asio::make_work_guard(_async_io);
        _async_io.run();
    }
	else
	{
		std::this_thread::sleep_for(std::chrono::milliseconds(5));
		_state_mon.run();
	}
}

void WtDtRunner::initialize(const char* cfgFile, const char* logCfg, const char* modDir /* = "" */)
{
	WTSLogger::init(logCfg);
	WtHelper::set_module_dir(modDir);

	WTSVariant* config = WTSCfgLoader::load_from_file(cfgFile, true);
	if(config == NULL)
	{
		WTSLogger::error_f("Loading config file {} failed", cfgFile);
		return;
	}

	//���������ļ�
	WTSVariant* cfgBF = config->get("basefiles");
	bool isUTF8 = cfgBF->getBoolean("utf-8");
	if (cfgBF->get("session"))
	{
		_bd_mgr.loadSessions(cfgBF->getCString("session"), isUTF8);
		WTSLogger::info("Trading sessions loaded");
	}

	WTSVariant* cfgItem = cfgBF->get("commodity");
	if (cfgItem)
	{
		if (cfgItem->type() == WTSVariant::VT_String)
		{
			_bd_mgr.loadCommodities(cfgItem->asCString(), isUTF8);
		}
		else if (cfgItem->type() == WTSVariant::VT_Array)
		{
			for (uint32_t i = 0; i < cfgItem->size(); i++)
			{
				_bd_mgr.loadCommodities(cfgItem->get(i)->asCString(), isUTF8);
			}
		}
	}

	cfgItem = cfgBF->get("contract");
	if (cfgItem)
	{
		if (cfgItem->type() == WTSVariant::VT_String)
		{
			_bd_mgr.loadContracts(cfgItem->asCString(), isUTF8);
		}
		else if (cfgItem->type() == WTSVariant::VT_Array)
		{
			for (uint32_t i = 0; i < cfgItem->size(); i++)
			{
				_bd_mgr.loadContracts(cfgItem->get(i)->asCString(), isUTF8);
			}
		}
	}

	if (cfgBF->get("holiday"))
	{
		_bd_mgr.loadHolidays(cfgBF->getCString("holiday"));
		WTSLogger::info("Holidays loaded");
	}

    /*
     *  By Wesley @ 2021.12.27
     *  �������ʵ���ϲ���Ҫ��������������Լ
     */
//	if (cfgBF->get("hot"))//	{
//		_hot_mgr.loadHots(cfgBF->getCString("hot"));
//		WTSLogger::info("Hot rules loaded");
//	}
//
//	if (cfgBF->get("second"))
//	{
//		_hot_mgr.loadSeconds(cfgBF->getCString("second"));
//		WTSLogger::info("Second rules loaded");
//	}

	_udp_caster.init(config->get("broadcaster"), &_bd_mgr, &_data_mgr);

	//By Wesley @ 2021.12.27
	//ȫ���ģʽ������Ҫ��ʹ��״̬��
	bool bAlldayMode = config->getBoolean("allday");
	if (!bAlldayMode)
	{
		_state_mon.initialize(config->getCString("statemonitor"), &_bd_mgr, &_data_mgr);
	}
	else
	{
		WTSLogger::info("QuoteFactory will run in allday mode");
	}

	initDataMgr(config->get("writer"), bAlldayMode);

	if (config->has("parsers"))
		initParsers(config->getCString("parsers"));
	else
		WTSLogger::log_raw(LL_WARN, "No parsers config, skipped loading parsers");

	config->release();
}

void WtDtRunner::initDataMgr(WTSVariant* config, bool bAlldayMode /* = false */)
{
	_data_mgr.init(config, &_bd_mgr, bAlldayMode ? NULL : &_state_mon, &_udp_caster);
}

void WtDtRunner::initParsers(const char* filename)
{
	WTSVariant* config = WTSCfgLoader::load_from_file(filename, true);
	if(config == NULL)
	{
		WTSLogger::error_f("Loading parser file {} failed", filename);
		return;
	}

	WTSVariant* cfg = config->get("parsers");

	for (uint32_t idx = 0; idx < cfg->size(); idx++)
	{
		WTSVariant* cfgItem = cfg->get(idx);
		if (!cfgItem->getBoolean("active"))
			continue;

		const char* id = cfgItem->getCString("id");

		// By Wesley @ 2021.12.14
		// ���idΪ�գ��������Զ�id
		std::string realid = id;
		if (realid.empty())
		{
			static uint32_t auto_parserid = 1000;
			realid = StrUtil::printf("auto_parser_%u", auto_parserid++);
		}
		
		ParserAdapterPtr adapter(new ParserAdapter(&_bd_mgr, &_data_mgr));
		adapter->init(realid.c_str(), cfgItem);
		_parsers.addAdapter(realid.c_str(), adapter);
	}

	WTSLogger::info("%u market data parsers loaded in total", _parsers.size());
	config->release();
}

#pragma region "Extended Parser"
void WtDtRunner::registerParserPorter(FuncParserEvtCallback cbEvt, FuncParserSubCallback cbSub)
{
	_cb_parser_evt = cbEvt;
	_cb_parser_sub = cbSub;

	WTSLogger::info("Callbacks of Extented Parser registration done");
}

void WtDtRunner::parser_init(const char* id)
{
	if (_cb_parser_evt)
		_cb_parser_evt(EVENT_PARSER_INIT, id);
}

void WtDtRunner::parser_connect(const char* id)
{
	if (_cb_parser_evt)
		_cb_parser_evt(EVENT_PARSER_CONNECT, id);
}

void WtDtRunner::parser_disconnect(const char* id)
{
	if (_cb_parser_evt)
		_cb_parser_evt(EVENT_PARSER_DISCONNECT, id);
}

void WtDtRunner::parser_release(const char* id)
{
	if (_cb_parser_evt)
		_cb_parser_evt(EVENT_PARSER_RELEASE, id);
}

void WtDtRunner::parser_subscribe(const char* id, const char* code)
{
	if (_cb_parser_sub)
		_cb_parser_sub(id, code, true);
}

void WtDtRunner::parser_unsubscribe(const char* id, const char* code)
{
	if (_cb_parser_sub)
		_cb_parser_sub(id, code, false);
}

void WtDtRunner::on_ext_parser_quote(const char* id, WTSTickStruct* curTick, uint32_t uProcFlag)
{
	ParserAdapterPtr adapter = _parsers.getAdapter(id);
	if (adapter)
	{
		WTSTickData* newTick = WTSTickData::create(*curTick);
		adapter->handleQuote(newTick, uProcFlag);
		newTick->release();
	}
	else
	{
		WTSLogger::warn_f("Parser {} not exists", id);
	}
}

bool WtDtRunner::createExtParser(const char* id)
{
	ParserAdapterPtr adapter(new ParserAdapter(&_bd_mgr, &_data_mgr));
	ExpParser* parser = new ExpParser(id);
	adapter->initExt(id, parser);
	_parsers.addAdapter(id, adapter);
	WTSLogger::info("Extended parser %s created", id);
	return true;
}

#pragma endregion 

bool WtDtRunner::createExtDumper(const char* id)
{
	ExpDumperPtr dumper(new ExpDumper(id));
	_dumpers[id] = dumper;

	_data_mgr.add_ext_dumper(id, dumper.get());

	WTSLogger::info("Extended dumper %s created", id);
	return true;
}

void WtDtRunner::registerExtDumper(FuncDumpBars barDumper, FuncDumpTicks tickDumper)
{
	_dumper_for_bars = barDumper;
	_dumper_for_ticks = tickDumper;
}

void WtDtRunner::registerExtHftDataDumper(FuncDumpOrdQue ordQueDumper, FuncDumpOrdDtl ordDtlDumper, FuncDumpTrans transDumper)
{
	_dumper_for_ordque = ordQueDumper;
	_dumper_for_orddtl = ordDtlDumper;
	_dumper_for_trans = transDumper;
}

bool WtDtRunner::dumpHisTicks(const char* id, const char* stdCode, uint32_t uDate, WTSTickStruct* ticks, uint32_t count)
{
	if (NULL == _dumper_for_ticks)
	{
		WTSLogger::error("Extended tick dumper not enabled");
		return false;
	}

	return _dumper_for_ticks(id, stdCode, uDate, ticks, count);
}

bool WtDtRunner::dumpHisBars(const char* id, const char* stdCode, const char* period, WTSBarStruct* bars, uint32_t count)
{
	if (NULL == _dumper_for_bars)
	{
		WTSLogger::error("Extended bar dumper not enabled");
		return false;
	}

	return _dumper_for_bars(id, stdCode, period, bars, count);
}

bool WtDtRunner::dumpHisOrdDtl(const char* id, const char* stdCode, uint32_t uDate, WTSOrdDtlStruct* items, uint32_t count)
{
	if (NULL == _dumper_for_orddtl)
	{
		WTSLogger::error("Extended order detail dumper not enabled");
		return false;
	}

	return _dumper_for_orddtl(id, stdCode, uDate, items, count);
}

bool WtDtRunner::dumpHisOrdQue(const char* id, const char* stdCode, uint32_t uDate, WTSOrdQueStruct* items, uint32_t count)
{
	if (NULL == _dumper_for_ordque)
	{
		WTSLogger::error("Extended order queue dumper not enabled");
		return false;
	}

	return _dumper_for_ordque(id, stdCode, uDate, items, count);
}

bool WtDtRunner::dumpHisTrans(const char* id, const char* stdCode, uint32_t uDate, WTSTransStruct* items, uint32_t count)
{
	if (NULL == _dumper_for_trans)
	{
		WTSLogger::error("Extended transaction dumper not enabled");
		return false;
	}

	return _dumper_for_trans(id, stdCode, uDate, items, count);
}